/*
 * Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
 */
package com.lightbend.lagom.javadsl.persistence;

import akka.Done;
import akka.event.Logging;
import akka.japi.Pair;
import akka.stream.javadsl.Flow;
import org.pcollections.PSequence;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/**
 * A read side processor.
 *
 * Read side processors consume events produced by {@link com.lightbend.lagom.javadsl.persistence.PersistentEntity}
 * instances, and update some read side data store that is optimized for queries.
 *
 * The events they consume must be tagged, and a read side is able to consume events of one or more tags. Events are
 * usually tagged according to some supertype of event, for example, events may be tagged as <code>Order</code> events.
 * They may also be tagged according to a hash of the ID of the entity associated with the event - this allows read
 * side event handling to be sharded across many nodes.  Tagging is done using
 * {@link com.lightbend.lagom.javadsl.persistence.AggregateEventTag}.
 *
 * Read side processors are responsible for tracking what events they have already seen. This is done using offsets,
 * which are sequential values associated with each event. Note that end users typically will not need to handle
 * offsets themselves, this will be provided by Lagom support specific to the read side datastore, and end users can
 * just focus on handling the events themselves.
 */
public abstract class ReadSideProcessor<Event extends AggregateEvent<Event>> {

    /**
     * Return a {@link ReadSideHandler} for the given offset type.
     *
     * @return The offset processor.
     */
    public abstract ReadSideHandler<Event> buildHandler();

    /**
     * An read side offset processor.
     *
     * This is responsible for the actual read side handling, including handling offsets and the events themselves.
     */
    public static abstract class ReadSideHandler<Event extends AggregateEvent<Event>> {

        /**
         * Prepare the database for all processors.
         *
         * This will be invoked at system startup. It is guaranteed to only be invoked once at a time across the entire
         * cluster, and so is safe to be used to perform actions like creating tables, that could cause problems if
         * done from multiple nodes.
         *
         * It will be invoked again if it fails, and it may be invoked multiple times as nodes of the cluster go up or
         * down. Unless the entire system is restarted, there is no way to guarantee that it will be invoked at a
         * particular time - in particular, it should not be used for doing upgrades unless the entire system is
         * restarted and a new cluster built from scratch.
         *
         * @return A completion stage that is redeemed when preparation is finished.
         */
        public CompletionStage<Done> globalPrepare() {
            return CompletableFuture.completedFuture(Done.getInstance());
        }

        /**
         * Prepare this processor.
         *
         * The primary purpose of this method is to load the last offset that was processed, so that read side
         * processing can continue from that offset.
         *
         * This also provides an opportunity for processors to do any initialisation activities, such as creating or
         * updating database tables, or migrating data.
         *
         * This will be invoked at least once for each tag, and may be invoked multiple times, such as in the event of
         * failure.
         *
         * @param tag The tag to get the offset for.
         * @return A completion stage that is redeemed when preparation is finished.
         */
        public CompletionStage<Offset> prepare(AggregateEventTag<Event> tag) {
            return CompletableFuture.completedFuture(Offset.NONE);
        }

        /**
         * Flow to handle the events.
         *
         * If the handler does any blocking, this flow should be configured to use a dispatcher that is configured to
         * allow for that blocking.
         */
        public abstract Flow<Pair<Event, Offset>, Done, ?> handle();
    }

    /**
     * The tags to aggregate.
     *
     * This must return at least one tag to aggregate. Read side processors will be sharded over the cluster by these
     * tags, so if events are tagged by a shard key, the read side processing load can be distributed across the
     * cluster.
     *
     * @return The tags to aggregate.
     */
    public abstract PSequence<AggregateEventTag<Event>> aggregateTags();

    /**
     * The name of this read side.
     *
     * This name should be unique among the read sides and entity types of the service. By default it is using the
     * short class name of the concrete `ReadSideProcessor` class. Subclasses may override to define other type names.
     * It is wise to override and retain the original name when the class name is changed because this name is used to
     * identify read sides throughout the cluster.
     */
    public String readSideName() {
        return Logging.simpleName(getClass());
    }
}
